package flinkstudy.stream.state;

import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import static java.lang.Thread.currentThread;

/**
 * @author daocr
 * @date 2020/2/6
 */
public class KeyedState {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;


    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }


    /**
     * Keyed State总是与key相关，并且只能应用于KeyedStream的函数和操作中。
     * <p>
     * 你可以认为Keyed State是一个已经分区或者划分的，每个state分区对应一个key的Operator State， 每个keyed-state逻辑上与一个<并行操作实例, 键>(<parallel-operator-instance, key>)绑定在一起，由于每个key属于唯一一个键控算子(keyed operator)的并行实例，我们可以简单地看作是<operator, key>。
     * <p>
     * Keyed State可以进一步的组成Key Group, Key Group是Flink重新分配Keyed State的最小单元，这里有跟定义的最大并行数一样多的Key Group，在运行时keyed operator的并行实例与key一起为一个或者多个Key Group工作。
     *
     * @throws Exception
     */
    @Test
    public void keyedStateV1() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;

        env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(2L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0)
                .flatMap(new keyedStateV1WindowAverage())
                .print();

        env.execute();

    }

    public static class keyedStateV1WindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        private transient ValueState<Tuple2<Long, Long>> sum;

        @Override
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

            // 2、获取 state 值
            Tuple2<Long, Long> currentSum = sum.value();

            System.out.println(currentThread().getName() + "currentSum qian : " + currentSum);

            // 3、计算 state
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;

            // 4、 更新 state
            sum.update(currentSum);

            System.out.println(currentThread().getName() + "currentSum hou : " + currentSum);

            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));


//            //5、 清除 state
//            if (currentSum.f0 >= 2) {
//                out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
//                sum.clear();
//            }

        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //   1、定义 state
            TypeInformation<Tuple2<Long, Long>> of = TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
            });

            ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor("average", of, Tuple2.of(0L, 0L));

            // 设置有效期
//            StateTtlConfig ttlConfig = StateTtlConfig
//                    .newBuilder(Time.seconds(1))
//                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
//                    .build();
//
//            average.enableTimeToLive(ttlConfig);

            sum = getRuntimeContext().getState(average);
        }
    }

    /**
     * 使用Operator State(或者非键控的state)的话，每个算子状态绑定到一个并行算子实例中。
     * Kafka Connector就是在Flink中使用Operator State的一个很好的例子，
     * 每个Kafka consumer的并行实例保存着一个topic分区和偏移量的map作为它的Operator State。
     * 当并行数发生变化时，Operator State接口支持在并行操作实例中进行重新分配，这里有多种方法来进行重分配。
     * <p>
     * 参考链接：https://www.jianshu.com/p/e9a330399b30
     * 参考：https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#writing-new-savepoints
     */
    @Test
    public void operatorState() throws Exception {

        StreamExecutionEnvironment env = flinkStreamExecutionEnvironment.streamExecutionEnvironment;
        env.getCheckpointConfig().setCheckpointInterval(1000_1);

        DataStreamSource<Tuple2<Long, Long>> streamSource = env.addSource(new SourceFunction<Tuple2<Long, Long>>() {
            @Override
            public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {

                while (true) {
                    ThreadLocalRandom current = ThreadLocalRandom.current();
                    ctx.collect(Tuple2.of(current.nextLong(10, 100), current.nextLong(10, 100)));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        SingleOutputStreamOperator<Tuple2<Long, Long>> map = streamSource.map(new RichMapFunctionMap());

        map.print();

        env.execute();
    }

    public static class RichMapFunctionMap extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> implements CheckpointedFunction {

        private transient ListState<Tuple2<Long, Long>> checkpointedState;

        private List<Tuple2<Long, Long>> bufferedElements = new ArrayList<>();

        @Override
        public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
            bufferedElements.add(value);
            return value;
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {

            System.err.println("RichMapFunctionMap.snapshotState" + currentThread().getName());

            //4 、把数据保存到 快照中
            for (Tuple2<Long, Long> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            System.err.println("RichMapFunctionMap.initializeState" + currentThread().getName());

            // 1、声明  快照
            ListStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ListStateDescriptor<>(
                            "buffered-elements",
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                            }));

            // 2 、注册 快照
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);

            // 3、 如果程序重置，则把快照的数据恢复到内存中
            if (context.isRestored()) {
                for (Tuple2<Long, Long> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }

}
